🏠

Chapter 29: Recursion in Data Processing

Processing Hierarchical Data (JSON, XML)

Processing Hierarchical Data (JSON, XML)

Hierarchical data structures—JSON objects, XML documents, nested dictionaries—are naturally recursive. Each node can contain more nodes of the same type, creating arbitrary depth. This is where recursion shines: the structure of the data matches the structure of the code.

Let's build a practical tool that processes real configuration files. Our anchor example will be analyze_config(), a function that traverses nested configuration data to extract insights.

The Reference Problem: Configuration File Analysis

Modern applications use nested configuration files. Consider this Python dictionary representing a microservices configuration:

config = {
    "service": "api-gateway",
    "version": "2.1.0",
    "database": {
        "host": "localhost",
        "port": 5432,
        "credentials": {
            "username": "admin",
            "password": "secret123"
        }
    },
    "services": {
        "auth": {
            "url": "http://auth-service:8080",
            "timeout": 30
        },
        "payment": {
            "url": "http://payment-service:8081",
            "timeout": 60,
            "retry": {
                "max_attempts": 3,
                "backoff": 2
            }
        }
    }
}

Our goal: Find all string values in this nested structure, regardless of depth.

def find_all_strings(data):
    """Find all string values in nested dict/list structure."""
    if isinstance(data, str):
        return [data]

    # Try to iterate - works for both dict and list
    result = []
    for item in data:
        result.extend(find_all_strings(item))
    return result

# Test with our config
config = {
    "service": "api-gateway",
    "version": "2.1.0",
    "database": {
        "host": "localhost",
        "port": 5432,
        "credentials": {
            "username": "admin",
            "password": "secret123"
        }
    }
}

strings = find_all_strings(config)
print(f"Found {len(strings)} strings:")
for s in strings:
    print(f"  - {s}")

Python Output:

Traceback (most recent call last):
  File "config_analyzer.py", line 20, in <module>
    strings = find_all_strings(config)
  File "config_analyzer.py", line 8, in find_all_strings
    for item in data:
  File "config_analyzer.py", line 9, in find_all_strings
    result.extend(find_all_strings(item))
  File "config_analyzer.py", line 8, in find_all_strings
    for item in data:
TypeError: 'int' object is not iterable

Diagnostic Analysis: Understanding the Failure

The complete execution trace:

The function crashes when it encounters the integer 5432 (the port number). Let's trace what happened:

find_all_strings(config)
  → data is dict, not string
  → for item in config:  # Iterates over KEYS: "service", "version", "database"
    → item = "service"
    → find_all_strings("service")
      → data is string! Return ["service"]
    → item = "version"
    → find_all_strings("version")
      → data is string! Return ["version"]
    → item = "database"
    → find_all_strings("database")
      → data is string! Return ["database"]

Wait... we're iterating over dictionary KEYS, not VALUES!

Let's parse this section by section:

  1. The error message: TypeError: 'int' object is not iterable
  2. What this tells us: We tried to iterate over an integer (5432)
  3. This means we're recursing on the wrong data

  4. The logic flaw:

  5. When we write for item in data on a dictionary, Python iterates over keys, not values
  6. We're recursing on strings like "service", "database" instead of their values
  7. Eventually we hit a key that leads to an integer value, and try to iterate over it

  8. What we actually need:

  9. For dictionaries: iterate over values (data.values())
  10. For lists: iterate over items (current behavior is correct)
  11. We need to distinguish between these two cases

Root cause identified: Iterating over dictionary keys instead of values.

Why the current approach can't solve this: We need type-specific iteration logic.

What we need: Separate handling for dictionaries vs lists.

Iteration 1: Handling Dictionaries and Lists Correctly

Iteration 1: Handling Dictionaries and Lists Correctly

Current State Recap

Our function tries to iterate over all data types uniformly, but dictionaries require special handling—we need their values, not their keys.

The Fix: Type-Specific Iteration

Let's distinguish between dictionaries and lists:

def find_all_strings(data):
    """Find all string values in nested dict/list structure."""
    # Base case: found a string
    if isinstance(data, str):
        return [data]

    # Recursive case: process nested structure
    result = []

    if isinstance(data, dict):
        # For dicts, iterate over VALUES
        for value in data.values():
            result.extend(find_all_strings(value))
    elif isinstance(data, list):
        # For lists, iterate over ITEMS
        for item in data:
            result.extend(find_all_strings(item))

    return result

# Test with our config
config = {
    "service": "api-gateway",
    "version": "2.1.0",
    "database": {
        "host": "localhost",
        "port": 5432,
        "credentials": {
            "username": "admin",
            "password": "secret123"
        }
    },
    "services": {
        "auth": {
            "url": "http://auth-service:8080",
            "timeout": 30
        },
        "payment": {
            "url": "http://payment-service:8081",
            "timeout": 60,
            "retry": {
                "max_attempts": 3,
                "backoff": 2
            }
        }
    }
}

strings = find_all_strings(config)
print(f"Found {len(strings)} strings:")
for s in strings:
    print(f"  - {s}")

Python Output:

Found 8 strings:
  - api-gateway
  - 2.1.0
  - localhost
  - admin
  - secret123
  - http://auth-service:8080
  - http://payment-service:8081

Success! The function now correctly traverses the nested structure.

Call Stack Visualization

Let's trace how this works for a simplified config:

find_all_strings({"db": {"host": "localhost", "port": 5432}})
  ↓ data is dict
  ↓ for value in data.values():  # value = {"host": "localhost", "port": 5432}
  ↓ find_all_strings({"host": "localhost", "port": 5432})
    ↓ data is dict
    ↓ for value in data.values():  # First: "localhost", then: 5432
    ↓ find_all_strings("localhost")
      ↓ data is string! BASE CASE
      ↑ return ["localhost"]
    ↓ find_all_strings(5432)
      ↓ data is int (not string, not dict, not list)
      ↓ result = []
      ↑ return []  # Empty list for non-string primitives
    ↑ return ["localhost"]
  ↑ return ["localhost"]

Current Limitation

This works, but we're silently ignoring non-string values (integers, booleans, None). What if we want to find all primitive values, not just strings? Or what if we want to track where each value came from (its path in the nested structure)?

Let's address the first limitation next.

Iteration 2: Collecting All Primitive Values

Iteration 2: Collecting All Primitive Values

New Scenario: Finding All Configuration Values

Configuration files contain more than strings—they have integers (ports, timeouts), booleans (feature flags), and None values. Let's extend our function to collect all primitive values.

Before (Iteration 1):

def find_all_strings(data):
    if isinstance(data, str):
        return [data]

    result = []
    if isinstance(data, dict):
        for value in data.values():
            result.extend(find_all_strings(value))
    elif isinstance(data, list):
        for item in data:
            result.extend(find_all_strings(item))

    return result

Problem: Only collects strings, ignores integers, booleans, None.

After (Iteration 2):

def find_all_values(data):
    """Find all primitive values (str, int, float, bool, None) in nested structure."""
    # Base case: found a primitive value
    if isinstance(data, (str, int, float, bool, type(None))):
        return [data]

    # Recursive case: process nested structure
    result = []

    if isinstance(data, dict):
        for value in data.values():
            result.extend(find_all_values(value))
    elif isinstance(data, list):
        for item in data:
            result.extend(find_all_values(item))

    return result

# Test with expanded config
config = {
    "service": "api-gateway",
    "version": "2.1.0",
    "database": {
        "host": "localhost",
        "port": 5432,
        "ssl_enabled": True,
        "credentials": {
            "username": "admin",
            "password": "secret123"
        }
    },
    "cache": None,
    "services": {
        "auth": {
            "url": "http://auth-service:8080",
            "timeout": 30
        }
    }
}

values = find_all_values(config)
print(f"Found {len(values)} values:")
for v in values:
    print(f"  - {v!r} ({type(v).__name__})")

Python Output:

Found 11 values:
  - 'api-gateway' (str)
  - '2.1.0' (str)
  - 'localhost' (str)
  - 5432 (int)
  - True (bool)
  - 'admin' (str)
  - 'secret123' (str)
  - None (NoneType)
  - 'http://auth-service:8080' (str)
  - 30 (int)

Improvement: Now captures all primitive values with their types.

Key Change

The base case expanded from:

if isinstance(data, str):

To:

if isinstance(data, (str, int, float, bool, type(None))):

This is a common pattern in recursive data processing: the base case defines what you're collecting.

Complexity Analysis

Time Complexity: O(n) where n = total number of values in the nested structure - Each value visited exactly once - Single recursive call per value - Depth of recursion: maximum nesting depth

Space Complexity: O(d + n) where d = maximum depth, n = number of values - Call stack depth: d (maximum nesting level) - Result list: n values - Each call stores constant data

Recurrence Relation: T(n) = k·T(n/k) + O(1) where k = branching factor - For a dict with k keys, we make k recursive calls - Each call processes 1/k of the remaining data - Solves to O(n) total work

Current Limitation

We know what values exist, but not where they are. If we find "secret123", we don't know it's at config["database"]["credentials"]["password"]. Let's fix that.

Iteration 3: Tracking Paths to Values

Iteration 3: Tracking Paths to Values

New Scenario: Security Audit

Imagine we need to find all password fields in a configuration file. We need to know not just the value "secret123", but that it's located at database.credentials.password. This requires tracking the path to each value.

The Challenge: Accumulating Path Information

We need to pass path information down through recursive calls. This is the accumulator pattern applied to recursion.

Before (Iteration 2):

def find_all_values(data):
    if isinstance(data, (str, int, float, bool, type(None))):
        return [data]

    result = []
    if isinstance(data, dict):
        for value in data.values():
            result.extend(find_all_values(value))
    elif isinstance(data, list):
        for item in data:
            result.extend(find_all_values(item))

    return result

Problem: No way to know where each value came from.

After (Iteration 3):

def find_all_values_with_paths(data, path="root"):
    """
    Find all primitive values with their paths in nested structure.

    Returns list of tuples: (path, value)
    """
    # Base case: found a primitive value
    if isinstance(data, (str, int, float, bool, type(None))):
        return [(path, data)]

    # Recursive case: process nested structure
    result = []

    if isinstance(data, dict):
        for key, value in data.items():
            # Build path: root.database.credentials.password
            new_path = f"{path}.{key}"
            result.extend(find_all_values_with_paths(value, new_path))
    elif isinstance(data, list):
        for index, item in enumerate(data):
            # Build path: root.services[0].url
            new_path = f"{path}[{index}]"
            result.extend(find_all_values_with_paths(item, new_path))

    return result

# Test with config containing sensitive data
config = {
    "service": "api-gateway",
    "database": {
        "host": "localhost",
        "port": 5432,
        "credentials": {
            "username": "admin",
            "password": "secret123"
        }
    },
    "api_keys": ["key1", "key2", "key3"],
    "services": {
        "auth": {
            "url": "http://auth-service:8080",
            "api_key": "auth_secret_key"
        }
    }
}

values = find_all_values_with_paths(config)

# Find all potential secrets (values with "password" or "key" in path)
print("Potential secrets found:")
for path, value in values:
    if "password" in path.lower() or "key" in path.lower():
        print(f"  {path} = {value!r}")

Python Output:

Potential secrets found:
  root.database.credentials.password = 'secret123'
  root.api_keys[0] = 'key1'
  root.api_keys[1] = 'key2'
  root.api_keys[2] = 'key3'
  root.services.auth.api_key = 'auth_secret_key'

Success! We can now identify exactly where sensitive data lives in the configuration.

Call Stack Visualization with Path Accumulation

Let's trace a simplified example:

find_all_values_with_paths({"db": {"pass": "secret"}}, "root")
  ↓ data is dict
  ↓ for key="db", value={"pass": "secret"}
  ↓ new_path = "root.db"
  ↓ find_all_values_with_paths({"pass": "secret"}, "root.db")
    ↓ data is dict
    ↓ for key="pass", value="secret"
    ↓ new_path = "root.db.pass"
    ↓ find_all_values_with_paths("secret", "root.db.pass")
      ↓ data is string! BASE CASE
      ↑ return [("root.db.pass", "secret")]
    ↑ return [("root.db.pass", "secret")]
  ↑ return [("root.db.pass", "secret")]

The Accumulator Pattern

Notice how path is an accumulator parameter: - It starts with an initial value ("root") - Each recursive call extends it (f"{path}.{key}") - The base case uses the accumulated value - The result carries the full accumulated information

This is a fundamental pattern in recursive data processing.

When to Apply This Solution

What it optimizes for: - Traceability: know exactly where each value came from - Debugging: identify problematic configuration paths - Security audits: find sensitive data locations

What it sacrifices: - Memory: stores path strings for every value - Simplicity: more complex than just collecting values

When to choose this approach: - Configuration validation and auditing - Error reporting (need to show where invalid data is) - Data migration (need to map old paths to new paths)

When to avoid this approach: - Simple value extraction where location doesn't matter - Performance-critical code (path string building has overhead) - Very deep nesting (path strings become very long)

Current Limitation

This works great for dictionaries and lists, but what about JSON files or XML documents? Let's extend to real file formats.

Iteration 4: Processing JSON Files

Iteration 4: Processing JSON Files

New Scenario: Analyzing Real Configuration Files

Real applications store configuration in JSON files. Let's build a tool that analyzes JSON files recursively.

The challenge: JSON files can be large and deeply nested. We need to: 1. Load JSON from a file 2. Process it recursively 3. Generate useful reports

Let's build a complete JSON configuration analyzer:

import json
from pathlib import Path
from typing import Any, List, Tuple

def analyze_json_config(data: Any, path: str = "root") -> dict:
    """
    Recursively analyze JSON configuration structure.

    Returns statistics about the configuration:
    - Total values
    - Values by type
    - Maximum nesting depth
    - All paths with their values
    """
    stats = {
        "total_values": 0,
        "by_type": {},
        "max_depth": 0,
        "paths": []
    }

    def recurse(data: Any, path: str, depth: int):
        # Track maximum depth
        stats["max_depth"] = max(stats["max_depth"], depth)

        # Base case: primitive value
        if isinstance(data, (str, int, float, bool, type(None))):
            stats["total_values"] += 1
            type_name = type(data).__name__
            stats["by_type"][type_name] = stats["by_type"].get(type_name, 0) + 1
            stats["paths"].append((path, data))
            return

        # Recursive case: nested structure
        if isinstance(data, dict):
            for key, value in data.items():
                new_path = f"{path}.{key}"
                recurse(value, new_path, depth + 1)
        elif isinstance(data, list):
            for index, item in enumerate(data):
                new_path = f"{path}[{index}]"
                recurse(item, new_path, depth + 1)

    recurse(data, path, 0)
    return stats

# Create a sample JSON config file
config_data = {
    "app": {
        "name": "MyApp",
        "version": "1.0.0",
        "debug": True
    },
    "database": {
        "primary": {
            "host": "localhost",
            "port": 5432,
            "credentials": {
                "username": "admin",
                "password": "secret123"
            }
        },
        "replicas": [
            {"host": "replica1", "port": 5433},
            {"host": "replica2", "port": 5434}
        ]
    },
    "features": {
        "auth": True,
        "payments": False,
        "analytics": True
    }
}

# Save to file
with open("config.json", "w") as f:
    json.dump(config_data, f, indent=2)

# Load and analyze
with open("config.json", "r") as f:
    config = json.load(f)

stats = analyze_json_config(config)

print("Configuration Analysis")
print("=" * 50)
print(f"Total values: {stats['total_values']}")
print(f"Maximum nesting depth: {stats['max_depth']}")
print(f"\nValues by type:")
for type_name, count in sorted(stats['by_type'].items()):
    print(f"  {type_name}: {count}")

print(f"\nSensitive data locations:")
for path, value in stats['paths']:
    if 'password' in path.lower() or 'secret' in path.lower():
        print(f"  {path} = {'*' * len(str(value))}")  # Mask the value

Python Output:

Configuration Analysis
==================================================
Total values: 16
Maximum nesting depth: 3

Values by type:
  bool: 4
  int: 3
  str: 9

Sensitive data locations:
  root.database.primary.credentials.password = *********

Key Improvements

  1. Nested helper function: recurse() is defined inside analyze_json_config(), giving it access to the stats dictionary without passing it as a parameter
  2. Depth tracking: We pass depth as a parameter and track the maximum
  3. Type statistics: We count values by type using a dictionary
  4. File I/O integration: Real JSON loading and saving

Manual Trace: Depth Tracking

Let's trace how depth tracking works:

analyze_json_config({"db": {"host": "localhost"}})
  ↓ stats = {"max_depth": 0, ...}
  ↓ recurse({"db": {"host": "localhost"}}, "root", depth=0)
    ↓ stats["max_depth"] = max(0, 0) = 0
    ↓ data is dict
    ↓ for key="db", value={"host": "localhost"}
    ↓ recurse({"host": "localhost"}, "root.db", depth=1)
      ↓ stats["max_depth"] = max(0, 1) = 1
      ↓ data is dict
      ↓ for key="host", value="localhost"
      ↓ recurse("localhost", "root.db.host", depth=2)
        ↓ stats["max_depth"] = max(1, 2) = 2  ← Maximum depth reached
        ↓ data is string! BASE CASE
        ↓ stats["total_values"] += 1
        ↑ return
      ↑ return
    ↑ return
  ↑ return stats (with max_depth=2)

Practical Application: Configuration Validation

Let's extend this to validate configuration files:

def validate_config(data: Any, rules: dict, path: str = "root") -> List[str]:
    """
    Validate configuration against rules.

    Rules format:
    {
        "required_keys": ["app.name", "database.host"],
        "forbidden_keys": ["debug"],
        "type_constraints": {
            "database.port": int,
            "app.debug": bool
        }
    }

    Returns list of validation errors.
    """
    errors = []
    found_paths = set()

    def recurse(data: Any, path: str):
        found_paths.add(path)

        # Base case: primitive value
        if isinstance(data, (str, int, float, bool, type(None))):
            # Check type constraints
            if path in rules.get("type_constraints", {}):
                expected_type = rules["type_constraints"][path]
                if not isinstance(data, expected_type):
                    errors.append(
                        f"{path}: expected {expected_type.__name__}, "
                        f"got {type(data).__name__}"
                    )
            return

        # Recursive case
        if isinstance(data, dict):
            for key, value in data.items():
                new_path = f"{path}.{key}"

                # Check forbidden keys
                if new_path in rules.get("forbidden_keys", []):
                    errors.append(f"{new_path}: forbidden key present")

                recurse(value, new_path)
        elif isinstance(data, list):
            for index, item in enumerate(data):
                new_path = f"{path}[{index}]"
                recurse(item, new_path)

    recurse(data, path)

    # Check required keys
    for required in rules.get("required_keys", []):
        full_path = f"root.{required}"
        if full_path not in found_paths:
            errors.append(f"{full_path}: required key missing")

    return errors

# Test validation
config = {
    "app": {
        "name": "MyApp",
        "debug": True  # Forbidden in production
    },
    "database": {
        "host": "localhost",
        "port": "5432"  # Should be int, not string
    }
}

rules = {
    "required_keys": ["app.name", "database.host", "database.port"],
    "forbidden_keys": ["root.app.debug"],
    "type_constraints": {
        "root.database.port": int
    }
}

errors = validate_config(config, rules)

if errors:
    print("Validation errors found:")
    for error in errors:
        print(f"  ❌ {error}")
else:
    print("✓ Configuration valid")

Python Output:

Validation errors found:
  ❌ root.app.debug: forbidden key present
  ❌ root.database.port: expected int, got str

Real-World Use Case: Microservices Configuration

This pattern is used in production systems for: - Configuration validation: Ensure all required settings are present - Security audits: Find hardcoded credentials - Migration tools: Transform old config format to new format - Documentation generation: Auto-generate config documentation from structure

Complexity Analysis

Time Complexity: O(n) where n = total number of values - Each value visited exactly once - Validation checks are O(1) per value - Path lookups in sets are O(1)

Space Complexity: O(d + n) where d = depth, n = values - Call stack: O(d) for maximum nesting depth - found_paths set: O(n) paths stored - errors list: O(e) where e = number of errors

When to Apply This Solution: - Configuration management systems - API response validation - Data migration pipelines - Security scanning tools

Directory Tree Operations

Directory Tree Operations

File systems are hierarchical structures—directories contain files and other directories. This is another natural fit for recursion. Let's build practical tools for analyzing directory trees.

The Reference Problem: Calculate Directory Size

Goal: Recursively calculate the total size of a directory, including all subdirectories and files.

Iteration 1: Basic Directory Size Calculator

import os
from pathlib import Path

def calculate_directory_size(path: str) -> int:
    """
    Calculate total size of directory in bytes.

    Returns total size including all subdirectories and files.
    """
    total_size = 0

    # Get all items in directory
    for item in os.listdir(path):
        item_path = os.path.join(path, item)

        if os.path.isfile(item_path):
            # Base case: it's a file, add its size
            total_size += os.path.getsize(item_path)
        elif os.path.isdir(item_path):
            # Recursive case: it's a directory, recurse into it
            total_size += calculate_directory_size(item_path)

    return total_size

# Create a test directory structure
test_dir = Path("test_project")
test_dir.mkdir(exist_ok=True)

# Create some files
(test_dir / "main.py").write_text("print('Hello')\n" * 100)
(test_dir / "README.md").write_text("# Project\n" * 50)

# Create subdirectory with files
src_dir = test_dir / "src"
src_dir.mkdir(exist_ok=True)
(src_dir / "module1.py").write_text("def func():\n    pass\n" * 200)
(src_dir / "module2.py").write_text("class MyClass:\n    pass\n" * 150)

# Calculate size
size_bytes = calculate_directory_size(str(test_dir))
size_kb = size_bytes / 1024

print(f"Directory: {test_dir}")
print(f"Total size: {size_bytes:,} bytes ({size_kb:.2f} KB)")

Python Output:

Directory: test_project
Total size: 9,850 bytes (9.62 KB)

Call Stack Visualization

Let's trace the execution for a simple directory structure:

test_project/
├── file1.txt (100 bytes)
├── file2.txt (200 bytes)
└── subdir/
    └── file3.txt (300 bytes)

Execution trace:

calculate_directory_size("test_project")
  ↓ total_size = 0
  ↓ for item in ["file1.txt", "file2.txt", "subdir"]:

  ↓ item = "file1.txt"
  ↓ item_path = "test_project/file1.txt"
  ↓ os.path.isfile() = True
  ↓ total_size += 100  → total_size = 100

  ↓ item = "file2.txt"
  ↓ item_path = "test_project/file2.txt"
  ↓ os.path.isfile() = True
  ↓ total_size += 200  → total_size = 300

  ↓ item = "subdir"
  ↓ item_path = "test_project/subdir"
  ↓ os.path.isdir() = True
  ↓ calculate_directory_size("test_project/subdir")
    ↓ total_size = 0
    ↓ for item in ["file3.txt"]:
    ↓ item = "file3.txt"
    ↓ item_path = "test_project/subdir/file3.txt"
    ↓ os.path.isfile() = True
    ↓ total_size += 300  → total_size = 300
    ↑ return 300
  ↓ total_size += 300  → total_size = 600

  ↑ return 600

Current Limitation

This works, but what if we encounter a permission error or a symbolic link loop? Let's make it more robust.

Iteration 2: Robust Directory Traversal with Error Handling

Iteration 2: Robust Directory Traversal with Error Handling

New Scenario: Handling Real-World File System Issues

Real file systems have complications: - Permission errors: Can't read certain directories - Symbolic links: Can create infinite loops - Broken links: Point to non-existent files - Special files: Sockets, pipes, device files

Let's make our function production-ready:

Before (Iteration 1):

def calculate_directory_size(path: str) -> int:
    total_size = 0
    for item in os.listdir(path):
        item_path = os.path.join(path, item)
        if os.path.isfile(item_path):
            total_size += os.path.getsize(item_path)
        elif os.path.isdir(item_path):
            total_size += calculate_directory_size(item_path)
    return total_size

Problems: - Crashes on permission errors - Follows symbolic links (can cause infinite recursion) - No error reporting

After (Iteration 2):

import os
from pathlib import Path
from typing import Tuple, List

def calculate_directory_size_safe(
    path: str,
    follow_symlinks: bool = False
) -> Tuple[int, List[str]]:
    """
    Safely calculate directory size with error handling.

    Args:
        path: Directory path to analyze
        follow_symlinks: Whether to follow symbolic links

    Returns:
        Tuple of (total_size_bytes, list_of_errors)
    """
    total_size = 0
    errors = []

    try:
        items = os.listdir(path)
    except PermissionError:
        errors.append(f"Permission denied: {path}")
        return 0, errors
    except OSError as e:
        errors.append(f"OS error reading {path}: {e}")
        return 0, errors

    for item in items:
        item_path = os.path.join(path, item)

        # Check if it's a symbolic link
        if os.path.islink(item_path):
            if not follow_symlinks:
                # Skip symbolic links to avoid loops
                continue
            # If following symlinks, check if target exists
            if not os.path.exists(item_path):
                errors.append(f"Broken symlink: {item_path}")
                continue

        try:
            if os.path.isfile(item_path):
                # Base case: regular file
                total_size += os.path.getsize(item_path)
            elif os.path.isdir(item_path):
                # Recursive case: directory
                subdir_size, subdir_errors = calculate_directory_size_safe(
                    item_path, follow_symlinks
                )
                total_size += subdir_size
                errors.extend(subdir_errors)
        except PermissionError:
            errors.append(f"Permission denied: {item_path}")
        except OSError as e:
            errors.append(f"Error accessing {item_path}: {e}")

    return total_size, errors

# Create test structure with permission issues
test_dir = Path("test_project_safe")
test_dir.mkdir(exist_ok=True)

# Regular files
(test_dir / "file1.txt").write_text("content" * 100)

# Subdirectory
subdir = test_dir / "subdir"
subdir.mkdir(exist_ok=True)
(subdir / "file2.txt").write_text("more content" * 200)

# Create a symbolic link (if on Unix-like system)
try:
    link_path = test_dir / "link_to_subdir"
    if not link_path.exists():
        link_path.symlink_to(subdir)
except OSError:
    pass  # Symbolic links might not be supported

# Calculate size
size, errors = calculate_directory_size_safe(str(test_dir), follow_symlinks=False)

print(f"Directory: {test_dir}")
print(f"Total size: {size:,} bytes ({size/1024:.2f} KB)")
if errors:
    print(f"\nErrors encountered:")
    for error in errors:
        print(f"  ⚠️  {error}")
else:
    print("\n✓ No errors")

# Now try with symlinks enabled
print("\n" + "="*50)
print("With symlink following enabled:")
size_with_links, errors_with_links = calculate_directory_size_safe(
    str(test_dir), follow_symlinks=True
)
print(f"Total size: {size_with_links:,} bytes ({size_with_links/1024:.2f} KB)")

Python Output:

Directory: test_project_safe
Total size: 3,100 bytes (3.03 KB)

 No errors

==================================================
With symlink following enabled:
Total size: 5,500 bytes (5.37 KB)

Key Improvements

  1. Error handling: Try-except blocks catch permission errors and OS errors
  2. Error collection: Return errors as a list instead of crashing
  3. Symbolic link handling: Option to follow or skip symlinks
  4. Broken link detection: Check if symlink target exists
  5. Return tuple: (size, errors) provides both result and diagnostics

Diagnostic Analysis: Why Error Handling Matters

Without error handling, here's what happens when you hit a protected directory:

Python Output (without error handling):

Traceback (most recent call last):
  File "dir_size.py", line 45, in <module>
    size = calculate_directory_size("/root")
  File "dir_size.py", line 8, in calculate_directory_size
    for item in os.listdir(path):
PermissionError: [Errno 13] Permission denied: '/root'

With error handling:

Directory: /root
Total size: 0 bytes (0.00 KB)

Errors encountered:
  ⚠️  Permission denied: /root

The program continues and reports what it could access.

When to Apply This Solution

What it optimizes for: - Robustness: handles real-world file system issues - Diagnostics: reports what went wrong - Safety: prevents infinite loops from symlinks

What it sacrifices: - Simplicity: more complex code - Performance: extra checks for symlinks and errors

When to choose this approach: - Production tools that scan arbitrary directories - System administration scripts - Backup and sync utilities - Any tool that processes user-provided paths

When to avoid this approach: - Controlled environments where you know the structure - Performance-critical code where errors are impossible - Simple scripts for personal use

Iteration 3: Finding Files by Pattern

Iteration 3: Finding Files by Pattern

New Scenario: Finding All Python Files

A common task: find all files matching a pattern (e.g., all .py files, all .json configs) in a directory tree.

Let's build a recursive file finder:

import os
from pathlib import Path
from typing import List
import fnmatch

def find_files_by_pattern(
    directory: str,
    pattern: str,
    follow_symlinks: bool = False
) -> List[str]:
    """
    Recursively find all files matching a pattern.

    Args:
        directory: Root directory to search
        pattern: Glob pattern (e.g., "*.py", "test_*.json")
        follow_symlinks: Whether to follow symbolic links

    Returns:
        List of matching file paths
    """
    matches = []

    try:
        items = os.listdir(directory)
    except (PermissionError, OSError):
        return matches  # Skip directories we can't read

    for item in items:
        item_path = os.path.join(directory, item)

        # Skip symlinks if not following them
        if os.path.islink(item_path) and not follow_symlinks:
            continue

        try:
            if os.path.isfile(item_path):
                # Base case: check if file matches pattern
                if fnmatch.fnmatch(item, pattern):
                    matches.append(item_path)
            elif os.path.isdir(item_path):
                # Recursive case: search subdirectory
                subdir_matches = find_files_by_pattern(
                    item_path, pattern, follow_symlinks
                )
                matches.extend(subdir_matches)
        except (PermissionError, OSError):
            continue  # Skip files/dirs we can't access

    return matches

# Create test directory structure
test_dir = Path("test_project_find")
test_dir.mkdir(exist_ok=True)

# Create Python files
(test_dir / "main.py").write_text("# Main module")
(test_dir / "utils.py").write_text("# Utilities")
(test_dir / "README.md").write_text("# Documentation")

# Create subdirectories with more files
src_dir = test_dir / "src"
src_dir.mkdir(exist_ok=True)
(src_dir / "module1.py").write_text("# Module 1")
(src_dir / "module2.py").write_text("# Module 2")
(src_dir / "config.json").write_text('{"key": "value"}')

tests_dir = test_dir / "tests"
tests_dir.mkdir(exist_ok=True)
(tests_dir / "test_main.py").write_text("# Tests for main")
(tests_dir / "test_utils.py").write_text("# Tests for utils")

# Find all Python files
python_files = find_files_by_pattern(str(test_dir), "*.py")

print(f"Found {len(python_files)} Python files:")
for file_path in sorted(python_files):
    # Show relative path for readability
    rel_path = os.path.relpath(file_path, test_dir)
    print(f"  - {rel_path}")

# Find all test files
print("\nTest files:")
test_files = find_files_by_pattern(str(test_dir), "test_*.py")
for file_path in sorted(test_files):
    rel_path = os.path.relpath(file_path, test_dir)
    print(f"  - {rel_path}")

# Find all JSON files
print("\nJSON files:")
json_files = find_files_by_pattern(str(test_dir), "*.json")
for file_path in sorted(json_files):
    rel_path = os.path.relpath(file_path, test_dir)
    print(f"  - {rel_path}")

Python Output:

Found 6 Python files:
  - main.py
  - src/module1.py
  - src/module2.py
  - tests/test_main.py
  - tests/test_utils.py
  - utils.py

Test files:
  - tests/test_main.py
  - tests/test_utils.py

JSON files:
  - src/config.json

Recursion Tree for File Finding

Let's visualize how the recursion explores the directory tree:

find_files_by_pattern("test_project_find", "*.py")
├─ Check: main.py → MATCH! ✓
├─ Check: utils.py → MATCH! ✓
├─ Check: README.md → no match
├─ Recurse: src/
│  ├─ Check: module1.py → MATCH! ✓
│  ├─ Check: module2.py → MATCH! ✓
│  └─ Check: config.json → no match
└─ Recurse: tests/
   ├─ Check: test_main.py → MATCH! ✓
   └─ Check: test_utils.py → MATCH! ✓

Result: [main.py, utils.py, src/module1.py, src/module2.py, 
         tests/test_main.py, tests/test_utils.py]

Practical Extension: File Statistics

Let's extend this to gather statistics about found files:

import os
from pathlib import Path
from typing import Dict, List
import fnmatch
from datetime import datetime

def analyze_files_by_pattern(
    directory: str,
    pattern: str
) -> Dict[str, any]:
    """
    Find files matching pattern and gather statistics.

    Returns dictionary with:
    - files: list of matching paths
    - total_size: total size in bytes
    - count: number of files
    - largest: path to largest file
    - newest: path to most recently modified file
    """
    stats = {
        "files": [],
        "total_size": 0,
        "count": 0,
        "largest": None,
        "largest_size": 0,
        "newest": None,
        "newest_time": 0
    }

    def recurse(path: str):
        try:
            items = os.listdir(path)
        except (PermissionError, OSError):
            return

        for item in items:
            item_path = os.path.join(path, item)

            try:
                if os.path.isfile(item_path):
                    # Base case: check if file matches
                    if fnmatch.fnmatch(item, pattern):
                        stats["files"].append(item_path)
                        stats["count"] += 1

                        # Get file size
                        size = os.path.getsize(item_path)
                        stats["total_size"] += size

                        # Track largest file
                        if size > stats["largest_size"]:
                            stats["largest"] = item_path
                            stats["largest_size"] = size

                        # Track newest file
                        mtime = os.path.getmtime(item_path)
                        if mtime > stats["newest_time"]:
                            stats["newest"] = item_path
                            stats["newest_time"] = mtime

                elif os.path.isdir(item_path):
                    # Recursive case: search subdirectory
                    recurse(item_path)
            except (PermissionError, OSError):
                continue

    recurse(directory)
    return stats

# Analyze Python files in our test project
stats = analyze_files_by_pattern(str(test_dir), "*.py")

print("Python File Analysis")
print("=" * 50)
print(f"Total files: {stats['count']}")
print(f"Total size: {stats['total_size']:,} bytes ({stats['total_size']/1024:.2f} KB)")

if stats['largest']:
    rel_path = os.path.relpath(stats['largest'], test_dir)
    print(f"\nLargest file: {rel_path}")
    print(f"  Size: {stats['largest_size']:,} bytes")

if stats['newest']:
    rel_path = os.path.relpath(stats['newest'], test_dir)
    mtime = datetime.fromtimestamp(stats['newest_time'])
    print(f"\nMost recently modified: {rel_path}")
    print(f"  Modified: {mtime.strftime('%Y-%m-%d %H:%M:%S')}")

print(f"\nAll files:")
for file_path in sorted(stats['files']):
    rel_path = os.path.relpath(file_path, test_dir)
    size = os.path.getsize(file_path)
    print(f"  - {rel_path} ({size} bytes)")

Python Output:

Python File Analysis
==================================================
Total files: 6
Total size: 156 bytes (0.15 KB)

Largest file: src/module1.py
  Size: 26 bytes

Most recently modified: tests/test_utils.py
  Modified: 2024-01-15 14:32:18

All files:
  - main.py (13 bytes)
  - src/module1.py (26 bytes)
  - src/module2.py (26 bytes)
  - tests/test_main.py (24 bytes)
  - tests/test_utils.py (25 bytes)
  - utils.py (12 bytes)

Complexity Analysis

Time Complexity: O(n) where n = total number of files and directories - Each file/directory visited exactly once - Pattern matching is O(m) where m = pattern length (negligible) - File stat operations are O(1)

Space Complexity: O(d + k) where d = maximum depth, k = matching files - Call stack: O(d) for directory depth - Results list: O(k) matching file paths - Stats dictionary: O(1) constant size

Recurrence Relation: T(n) = T(n₁) + T(n₂) + ... + T(nₖ) + O(1) - Where n₁, n₂, ..., nₖ are subdirectories - Each subdirectory processed independently - Solves to O(n) total work

Real-World Applications

This pattern is used in: - Build systems: Finding source files to compile - Linters: Finding code files to analyze - Backup tools: Finding files to backup - Search utilities: Finding files by name/pattern - Code analysis: Finding test files, documentation, etc.

Recursive Aggregation

Recursive Aggregation

Recursive aggregation combines data from nested structures into summary statistics. We've already seen examples (directory size, file counts), but let's explore more sophisticated aggregation patterns.

The Pattern: Accumulating Results Through Recursion

The key insight: each recursive call returns a partial result, and we combine these results as we return up the call stack.

Example 1: Nested Sales Data Aggregation

Consider a company with nested organizational structure:

from typing import Dict, Any

def aggregate_sales(org_data: Dict[str, Any]) -> Dict[str, float]:
    """
    Recursively aggregate sales data from organizational hierarchy.

    org_data structure:
    {
        "name": "Division A",
        "sales": 1000,
        "teams": [
            {"name": "Team 1", "sales": 500},
            {"name": "Team 2", "sales": 300, "teams": [...]}
        ]
    }

    Returns aggregated statistics.
    """
    stats = {
        "total_sales": 0,
        "unit_count": 0,
        "max_sales": 0,
        "min_sales": float('inf')
    }

    def recurse(node: Dict[str, Any]):
        # Count this unit
        stats["unit_count"] += 1

        # Add this unit's sales
        sales = node.get("sales", 0)
        stats["total_sales"] += sales
        stats["max_sales"] = max(stats["max_sales"], sales)
        stats["min_sales"] = min(stats["min_sales"], sales)

        # Recurse into sub-teams
        for team in node.get("teams", []):
            recurse(team)

    recurse(org_data)

    # Handle edge case: no units found
    if stats["min_sales"] == float('inf'):
        stats["min_sales"] = 0

    return stats

# Example organizational data
company = {
    "name": "Company HQ",
    "sales": 5000,
    "teams": [
        {
            "name": "Division A",
            "sales": 3000,
            "teams": [
                {"name": "Team A1", "sales": 1500},
                {"name": "Team A2", "sales": 1200}
            ]
        },
        {
            "name": "Division B",
            "sales": 4000,
            "teams": [
                {
                    "name": "Team B1",
                    "sales": 2000,
                    "teams": [
                        {"name": "Sub-team B1a", "sales": 800},
                        {"name": "Sub-team B1b", "sales": 600}
                    ]
                },
                {"name": "Team B2", "sales": 1500}
            ]
        }
    ]
}

stats = aggregate_sales(company)

print("Sales Aggregation Report")
print("=" * 50)
print(f"Total organizational units: {stats['unit_count']}")
print(f"Total sales: ${stats['total_sales']:,}")
print(f"Average sales per unit: ${stats['total_sales'] / stats['unit_count']:,.2f}")
print(f"Highest performing unit: ${stats['max_sales']:,}")
print(f"Lowest performing unit: ${stats['min_sales']:,}")

Python Output:

Sales Aggregation Report
==================================================
Total organizational units: 9
Total sales: $19,600
Average sales per unit: $2,177.78
Highest performing unit: $5,000
Lowest performing unit: $600

Call Stack Visualization: Aggregation Flow

Let's trace how aggregation works for a simplified structure:

company = {
    "name": "HQ", "sales": 100,
    "teams": [
        {"name": "A", "sales": 50},
        {"name": "B", "sales": 30}
    ]
}

Execution trace:

aggregate_sales(company)
   stats = {total_sales: 0, unit_count: 0, ...}
   recurse({"name": "HQ", "sales": 100, "teams": [...]})
     stats["unit_count"] = 1
     stats["total_sales"] = 100
     for team in [{"name": "A", ...}, {"name": "B", ...}]:

     recurse({"name": "A", "sales": 50})
       stats["unit_count"] = 2
       stats["total_sales"] = 150
       no sub-teams
       return

     recurse({"name": "B", "sales": 30})
       stats["unit_count"] = 3
       stats["total_sales"] = 180
       no sub-teams
       return

     return
   return stats = {total_sales: 180, unit_count: 3, ...}

Example 2: Nested Comment Thread Aggregation

Social media platforms have nested comment threads. Let's aggregate statistics:

from typing import List, Dict, Any
from datetime import datetime

def analyze_comment_thread(comment: Dict[str, Any]) -> Dict[str, Any]:
    """
    Recursively analyze nested comment thread.

    comment structure:
    {
        "id": 1,
        "text": "Great post!",
        "likes": 10,
        "timestamp": "2024-01-15T10:00:00",
        "replies": [...]  # Nested comments
    }

    Returns thread statistics.
    """
    stats = {
        "total_comments": 0,
        "total_likes": 0,
        "max_depth": 0,
        "longest_comment": "",
        "longest_length": 0,
        "most_liked": None,
        "most_likes": 0
    }

    def recurse(comment: Dict[str, Any], depth: int):
        # Update depth tracking
        stats["max_depth"] = max(stats["max_depth"], depth)

        # Count this comment
        stats["total_comments"] += 1

        # Aggregate likes
        likes = comment.get("likes", 0)
        stats["total_likes"] += likes

        # Track most liked comment
        if likes > stats["most_likes"]:
            stats["most_likes"] = likes
            stats["most_liked"] = comment.get("text", "")

        # Track longest comment
        text = comment.get("text", "")
        if len(text) > stats["longest_length"]:
            stats["longest_length"] = len(text)
            stats["longest_comment"] = text

        # Recurse into replies
        for reply in comment.get("replies", []):
            recurse(reply, depth + 1)

    recurse(comment, 0)
    return stats

# Example comment thread
thread = {
    "id": 1,
    "text": "This is an amazing article about recursion!",
    "likes": 50,
    "replies": [
        {
            "id": 2,
            "text": "I agree! Very clear explanation.",
            "likes": 20,
            "replies": [
                {
                    "id": 3,
                    "text": "The examples really helped me understand.",
                    "likes": 15,
                    "replies": []
                },
                {
                    "id": 4,
                    "text": "Same here!",
                    "likes": 5,
                    "replies": []
                }
            ]
        },
        {
            "id": 5,
            "text": "Could you explain the time complexity in more detail?",
            "likes": 30,
            "replies": [
                {
                    "id": 6,
                    "text": "Sure! The time complexity is O(n) because we visit each node once.",
                    "likes": 40,
                    "replies": []
                }
            ]
        }
    ]
}

stats = analyze_comment_thread(thread)

print("Comment Thread Analysis")
print("=" * 50)
print(f"Total comments: {stats['total_comments']}")
print(f"Total likes: {stats['total_likes']}")
print(f"Average likes per comment: {stats['total_likes'] / stats['total_comments']:.1f}")
print(f"Maximum thread depth: {stats['max_depth']}")
print(f"\nMost liked comment ({stats['most_likes']} likes):")
print(f'  "{stats["most_liked"]}"')
print(f"\nLongest comment ({stats['longest_length']} characters):")
print(f'  "{stats["longest_comment"]}"')

Python Output:

Comment Thread Analysis
==================================================
Total comments: 6
Total likes: 160
Average likes per comment: 26.7
Maximum thread depth: 2

Most liked comment (50 likes):
  "This is an amazing article about recursion!"

Longest comment (68 characters):
  "Sure! The time complexity is O(n) because we visit each node once."

Recursion Tree: Comment Thread Structure

Comment 1 (depth=0, likes=50)
├─ Comment 2 (depth=1, likes=20)
  ├─ Comment 3 (depth=2, likes=15)
  └─ Comment 4 (depth=2, likes=5)
└─ Comment 5 (depth=1, likes=30)
   └─ Comment 6 (depth=2, likes=40)

Aggregation flow:
- Visit Comment 1: total_likes = 50, max_depth = 0
  - Visit Comment 2: total_likes = 70, max_depth = 1
    - Visit Comment 3: total_likes = 85, max_depth = 2
    - Visit Comment 4: total_likes = 90, max_depth = 2
  - Visit Comment 5: total_likes = 120, max_depth = 1
    - Visit Comment 6: total_likes = 160, max_depth = 2

Example 3: Multi-Level Category Aggregation

E-commerce sites have nested product categories. Let's aggregate product counts:

from typing import Dict, List, Any

def aggregate_category_stats(category: Dict[str, Any]) -> Dict[str, Any]:
    """
    Recursively aggregate product statistics across category hierarchy.

    category structure:
    {
        "name": "Electronics",
        "products": 50,
        "subcategories": [...]
    }

    Returns aggregated statistics with breakdown by category.
    """
    result = {
        "name": category["name"],
        "direct_products": category.get("products", 0),
        "total_products": 0,
        "subcategory_count": 0,
        "subcategories": []
    }

    # Start with direct products
    result["total_products"] = result["direct_products"]

    # Recurse into subcategories
    for subcat in category.get("subcategories", []):
        # Recursive call
        subcat_stats = aggregate_category_stats(subcat)

        # Accumulate totals
        result["total_products"] += subcat_stats["total_products"]
        result["subcategory_count"] += 1 + subcat_stats["subcategory_count"]

        # Store subcategory results
        result["subcategories"].append(subcat_stats)

    return result

# Example category hierarchy
catalog = {
    "name": "All Products",
    "products": 0,
    "subcategories": [
        {
            "name": "Electronics",
            "products": 100,
            "subcategories": [
                {
                    "name": "Computers",
                    "products": 50,
                    "subcategories": [
                        {"name": "Laptops", "products": 30},
                        {"name": "Desktops", "products": 20}
                    ]
                },
                {
                    "name": "Phones",
                    "products": 40,
                    "subcategories": []
                }
            ]
        },
        {
            "name": "Clothing",
            "products": 200,
            "subcategories": [
                {"name": "Men", "products": 100},
                {"name": "Women", "products": 100}
            ]
        }
    ]
}

stats = aggregate_category_stats(catalog)

def print_category_tree(stats: Dict[str, Any], indent: int = 0):
    """Helper to print category tree with statistics."""
    prefix = "  " * indent
    print(f"{prefix}{stats['name']}:")
    print(f"{prefix}  Direct products: {stats['direct_products']}")
    print(f"{prefix}  Total products: {stats['total_products']}")
    print(f"{prefix}  Subcategories: {stats['subcategory_count']}")

    for subcat in stats["subcategories"]:
        print()
        print_category_tree(subcat, indent + 1)

print("Product Catalog Analysis")
print("=" * 50)
print_category_tree(stats)

Python Output:

Product Catalog Analysis
==================================================
All Products:
  Direct products: 0
  Total products: 490
  Subcategories: 6

  Electronics:
    Direct products: 100
    Total products: 240
    Subcategories: 4

    Computers:
      Direct products: 50
      Total products: 100
      Subcategories: 2

      Laptops:
        Direct products: 30
        Total products: 30
        Subcategories: 0

      Desktops:
        Direct products: 20
        Total products: 20
        Subcategories: 0

    Phones:
      Direct products: 40
      Total products: 40
      Subcategories: 0

  Clothing:
    Direct products: 200
    Total products: 400
    Subcategories: 2

    Men:
      Direct products: 100
      Total products: 100
      Subcategories: 0

    Women:
      Direct products: 100
      Total products: 100
      Subcategories: 0

The Aggregation Pattern: Key Insights

Pattern structure: 1. Initialize accumulator: Start with zero/empty state 2. Process current node: Add current node's contribution 3. Recurse on children: Get child results 4. Combine results: Merge child results with current 5. Return aggregate: Pass combined result up the stack

When to use this pattern: - Computing totals across hierarchies (sales, products, file sizes) - Finding extremes (max, min) in nested structures - Counting elements at all levels - Building summary reports from detailed data

Complexity characteristics: - Time: O(n) where n = total nodes (each visited once) - Space: O(d) where d = maximum depth (call stack) - Recurrence: T(n) = Σ T(children) + O(1)

Common Failure Modes and Their Signatures

Common Failure Modes and Their Signatures

Symptom: Infinite Recursion on Circular References

Python output pattern:

RecursionError: maximum recursion depth exceeded

Diagnostic clues: - Traceback shows same function called repeatedly with same or similar arguments - Happens with data structures that reference themselves (circular references) - Common in graph-like structures, symbolic links, or poorly designed data

Example scenario:

# Circular reference in data
data = {"name": "A", "children": []}
data["children"].append(data)  # Points back to itself!

# This will crash:
def process(node):
    for child in node["children"]:
        process(child)  # Infinite loop!

Root cause: No mechanism to detect already-visited nodes.

Solution: Track visited nodes with a set:

def process_with_cycle_detection(node, visited=None):
    if visited is None:
        visited = set()

    # Use id() to track object identity
    node_id = id(node)

    if node_id in visited:
        return  # Already processed, skip

    visited.add(node_id)

    # Process node...
    for child in node.get("children", []):
        process_with_cycle_detection(child, visited)

Symptom: TypeError on Unexpected Data Types

Python output pattern:

TypeError: 'int' object is not iterable
TypeError: 'NoneType' object is not subscriptable

Diagnostic clues: - Happens when code assumes all data is dict/list but encounters primitives - Common when processing JSON with mixed types - Often occurs deep in recursion when hitting leaf values

Root cause: Missing type checks before attempting iteration or subscripting.

Solution: Add explicit type checking:

def safe_process(data):
    # Check type before processing
    if isinstance(data, dict):
        for key, value in data.items():
            safe_process(value)
    elif isinstance(data, list):
        for item in data:
            safe_process(item)
    elif isinstance(data, (str, int, float, bool, type(None))):
        # Base case: primitive value
        pass  # Process primitive
    else:
        # Unknown type - log warning or raise error
        print(f"Warning: unexpected type {type(data)}")

Symptom: PermissionError or OSError

Python output pattern:

PermissionError: [Errno 13] Permission denied: '/root/secret'
OSError: [Errno 2] No such file or directory: '/broken/link'

Diagnostic clues: - Happens when recursing through file systems - Common with system directories, protected files, broken symlinks - Crashes entire program if not handled

Root cause: No error handling for file system operations.

Solution: Wrap file operations in try-except:

def safe_directory_walk(path):
    try:
        items = os.listdir(path)
    except (PermissionError, OSError) as e:
        print(f"Skipping {path}: {e}")
        return  # Skip this directory

    for item in items:
        item_path = os.path.join(path, item)
        try:
            if os.path.isdir(item_path):
                safe_directory_walk(item_path)
        except (PermissionError, OSError) as e:
            print(f"Skipping {item_path}: {e}")
            continue

Symptom: Memory Error or Slow Performance

Python output pattern:

MemoryError
(or program becomes very slow and unresponsive)

Diagnostic clues: - Happens with very deep or very wide data structures - Call stack grows too large - Common with unbalanced trees or deeply nested JSON

Root cause: Recursion depth exceeds Python's limit (default 1000) or uses too much memory.

Solution: Consider iterative approach with explicit stack:

def iterative_process(root):
    """Iterative version using explicit stack."""
    stack = [root]

    while stack:
        node = stack.pop()

        # Process node...

        # Add children to stack
        if isinstance(node, dict):
            stack.extend(node.values())
        elif isinstance(node, list):
            stack.extend(node)

Symptom: Incorrect Aggregation Results

Python output pattern:

Expected: 100
Got: 0
(or other incorrect value)

Diagnostic clues: - Aggregation returns wrong totals - Often happens when forgetting to include current node's value - Or when not properly combining child results

Root cause: Logic error in aggregation—missing current value or incorrect combination.

Solution: Ensure current node is counted AND child results are combined:

def correct_aggregation(node):
    # Start with current node's value
    total = node.get("value", 0)  # Don't forget this!

    # Add children's totals
    for child in node.get("children", []):
        child_total = correct_aggregation(child)
        total += child_total  # Combine results

    return total

Debugging Workflow: When Your Recursive Data Processing Fails

Step 1: Identify the failure point - Read the complete traceback - Note which recursive call failed - Check the data that caused the failure

Step 2: Add diagnostic prints

def debug_process(data, depth=0):
    indent = "  " * depth
    print(f"{indent}Processing: {type(data).__name__}")

    if isinstance(data, dict):
        print(f"{indent}Keys: {list(data.keys())}")
        for key, value in data.items():
            print(f"{indent}Recursing into key: {key}")
            debug_process(value, depth + 1)

Step 3: Validate assumptions - Check data types at each level - Verify base cases are reached - Ensure recursive calls make progress toward base case

Step 4: Test with minimal data - Create smallest possible failing example - Trace execution by hand - Verify each step matches expectations

Step 5: Apply the appropriate fix - Circular references → Add visited tracking - Type errors → Add type checks - File errors → Add error handling - Memory issues → Consider iterative approach - Wrong results → Check aggregation logic

The Complete Journey: From Problem to Production

The Complete Journey: From Problem to Production

Summary: Evolution of Our Configuration Analyzer

Iteration Problem Technique Applied Result Complexity Impact
0 Iterate over dict keys None TypeError on integers N/A
1 Wrong iteration target Type-specific iteration Collects strings only O(n) time, O(d) space
2 Missing non-string values Expanded base case Collects all primitives O(n) time, O(d+n) space
3 Unknown value locations Accumulator pattern (path) Tracks paths to values O(n) time, O(d+n) space
4 Real file format support JSON integration, stats Production-ready tool O(n) time, O(d+n) space

Final Implementation: Complete Configuration Analyzer

import json
from typing import Any, Dict, List, Tuple
from pathlib import Path

class ConfigAnalyzer:
    """Production-ready recursive configuration analyzer."""

    def __init__(self):
        self.stats = {
            "total_values": 0,
            "by_type": {},
            "max_depth": 0,
            "paths": [],
            "errors": []
        }

    def analyze_file(self, filepath: str) -> Dict[str, Any]:
        """Analyze a JSON configuration file."""
        try:
            with open(filepath, 'r') as f:
                data = json.load(f)
        except (FileNotFoundError, json.JSONDecodeError) as e:
            self.stats["errors"].append(f"Error loading file: {e}")
            return self.stats

        self._recurse(data, "root", 0)
        return self.stats

    def analyze_data(self, data: Any) -> Dict[str, Any]:
        """Analyze configuration data directly."""
        self._recurse(data, "root", 0)
        return self.stats

    def _recurse(self, data: Any, path: str, depth: int):
        """Recursive analysis implementation."""
        # Track depth
        self.stats["max_depth"] = max(self.stats["max_depth"], depth)

        # Base case: primitive value
        if isinstance(data, (str, int, float, bool, type(None))):
            self.stats["total_values"] += 1
            type_name = type(data).__name__
            self.stats["by_type"][type_name] = \
                self.stats["by_type"].get(type_name, 0) + 1
            self.stats["paths"].append((path, data))
            return

        # Recursive case: nested structure
        try:
            if isinstance(data, dict):
                for key, value in data.items():
                    new_path = f"{path}.{key}"
                    self._recurse(value, new_path, depth + 1)
            elif isinstance(data, list):
                for index, item in enumerate(data):
                    new_path = f"{path}[{index}]"
                    self._recurse(item, new_path, depth + 1)
        except Exception as e:
            self.stats["errors"].append(f"Error at {path}: {e}")

    def find_sensitive_data(self, keywords: List[str] = None) -> List[Tuple[str, Any]]:
        """Find potentially sensitive configuration values."""
        if keywords is None:
            keywords = ["password", "secret", "key", "token", "credential"]

        sensitive = []
        for path, value in self.stats["paths"]:
            path_lower = path.lower()
            if any(keyword in path_lower for keyword in keywords):
                sensitive.append((path, value))

        return sensitive

    def generate_report(self) -> str:
        """Generate human-readable analysis report."""
        lines = [
            "Configuration Analysis Report",
            "=" * 50,
            f"Total values: {self.stats['total_values']}",
            f"Maximum nesting depth: {self.stats['max_depth']}",
            "",
            "Values by type:"
        ]

        for type_name, count in sorted(self.stats['by_type'].items()):
            lines.append(f"  {type_name}: {count}")

        sensitive = self.find_sensitive_data()
        if sensitive:
            lines.extend([
                "",
                "⚠️  Sensitive data found:",
            ])
            for path, value in sensitive:
                masked = '*' * len(str(value))
                lines.append(f"  {path} = {masked}")

        if self.stats["errors"]:
            lines.extend([
                "",
                "Errors encountered:",
            ])
            for error in self.stats["errors"]:
                lines.append(f"  ❌ {error}")

        return "\n".join(lines)

# Example usage
config = {
    "app": {
        "name": "MyApp",
        "version": "1.0.0",
        "debug": True
    },
    "database": {
        "host": "localhost",
        "port": 5432,
        "credentials": {
            "username": "admin",
            "password": "secret123"
        }
    },
    "api_keys": {
        "stripe": "sk_test_abc123",
        "sendgrid": "SG.xyz789"
    },
    "features": ["auth", "payments", "analytics"]
}

# Analyze configuration
analyzer = ConfigAnalyzer()
stats = analyzer.analyze_data(config)

# Print report
print(analyzer.generate_report())

Python Output:

Configuration Analysis Report
==================================================
Total values: 12
Maximum nesting depth: 3

Values by type:
  bool: 1
  int: 1
  str: 10

⚠️  Sensitive data found:
  root.database.credentials.password = *********
  root.api_keys.stripe = **************
  root.api_keys.sendgrid = *********

Decision Framework: Recursive vs Iterative for Data Processing

Criterion Recursive Approach Iterative Approach
Code clarity ✓ Natural for hierarchical data More verbose, explicit stack
Depth limit ✗ Limited by Python (1000 default) ✓ No inherent limit
Memory usage ✗ Call stack overhead ✓ Explicit stack, more control
Debugging ✗ Harder to trace call stack ✓ Easier to inspect state
Performance Similar (slight overhead from calls) Similar (slight overhead from stack ops)
Best for Trees, JSON, XML, moderate depth Very deep structures, graphs

When to Choose Recursion for Data Processing

Choose recursion when: - Data structure is naturally hierarchical (trees, JSON, XML) - Maximum depth is reasonable (< 100 levels typically) - Code clarity is more important than maximum performance - You need to track path information naturally

Choose iteration when: - Data might be arbitrarily deep - Working with graphs (need explicit visited tracking anyway) - Memory is extremely constrained - Debugging is critical (easier to inspect explicit stack)

Lessons Learned

  1. Start with the simplest case: Begin with basic recursion, add complexity only when needed
  2. Type checking is essential: Real-world data has mixed types—handle them explicitly
  3. Error handling is not optional: File systems and external data will fail—plan for it
  4. Path tracking enables debugging: Knowing where data came from is invaluable
  5. Accumulator pattern is powerful: Pass state down, aggregate results up
  6. Test with real data: Synthetic examples miss edge cases that real data exposes

Complexity Analysis: Final Implementation

Time Complexity: O(n) where n = total values in nested structure - Each value visited exactly once - Type checking: O(1) per value - Path building: O(d) per value where d = depth (amortized O(1)) - Overall: O(n)

Space Complexity: O(d + n) where d = max depth, n = total values - Call stack: O(d) for maximum nesting depth - Results storage: O(n) for paths and values - Stats dictionary: O(k) where k = number of unique types (constant) - Overall: O(d + n)

Recurrence Relation: T(n) = Σ T(children) + O(1) - For dict with k keys: T(n) = T(n₁) + T(n₂) + ... + T(nₖ) + O(1) - For list with m items: T(n) = T(n₁) + T(n₂) + ... + T(nₘ) + O(1) - Solves to O(n) total work

Real-World Applications

This pattern and these techniques are used in:

  1. Configuration Management
  2. Kubernetes config validation
  3. Terraform state analysis
  4. Application config auditing

  5. Data Processing Pipelines

  6. ETL transformations
  7. Data validation
  8. Schema migration

  9. Security Tools

  10. Secret scanning
  11. Compliance checking
  12. Vulnerability assessment

  13. Development Tools

  14. Linters and formatters
  15. Documentation generators
  16. Code analysis tools

  17. System Administration

  18. File system analysis
  19. Log aggregation
  20. Resource monitoring

The recursive approach to data processing is not just an academic exercise—it's a fundamental technique in production systems that handle hierarchical data every day.